/*
 * Dijjer - A Peer to Peer HTTP Cache
 * Copyright (C) 2004,2005 Change.Tv, Inc
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */
package dijjer.io.comm;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.Set;

import dijjer.AbstractFactory;
import dijjer.Dijjer;
import dijjer.io.BlockInfo;
import dijjer.io.download.Download;
import dijjer.io.store.DataStore;
import dijjer.io.store.HashStore;
import dijjer.io.xfer.BlockReceiver;
import dijjer.io.xfer.BlockTransmitter;
import dijjer.io.xfer.InputStreamBlockReceiver;
import dijjer.io.xfer.PacketThrottle;
import dijjer.io.xfer.PartiallyReceivedBlock;
import dijjer.io.xfer.http.HttpBlockReceiver;
import dijjer.util.AbstractThread;
import dijjer.util.Misc;
import dijjer.util.VeryLongInteger;
import dijjer.util.logging.Logger;

/**
 * This class handles otherwise unhandled incoming messages and deals with them accordingly
 * 
 * TODO: The way the various different types of request are handled is inconsistent, this should be tidied up
 * 
 * @author ian
 */
public class Dispatcher {

	public static final String VERSION = "$Id: Dispatcher.java 34 2009-03-20 13:27:26Z mathias.demare $";
	private final AbstractUdpSocketManager _usm;
	private final AbstractRoutingTable _rt;
	private final DataStore _ds;
	private final HashStore _hs;
	private final Set _seedNodes;
	private final HashSet _seenUids = new HashSet();
	private static Dispatcher _disp;

	public Dispatcher() {
		this(AbstractFactory.getFactory().getUdpSocketManager(), AbstractFactory.getFactory().getRoutingTable(), DataStore.getDataStore(),
				HashStore.getHashStore(), Dijjer.getDijjer().getSeedNodes());
	}

	public Dispatcher(AbstractUdpSocketManager usm, AbstractRoutingTable rt, DataStore ds, HashStore hs, Set seedNodes) {
		_usm = usm;
		_usm.setDispatcher(this);
		_rt = rt;
		_ds = ds;
		_hs = hs;
		_seedNodes = seedNodes;
	}

	public boolean handleMessage(final Message m) throws Exception {
		_rt.notifyOfContact(m.getSource());
		if (m.getSpec().equals(DMT.ping)) {
			_usm.send(m.getSource(), DMT.createPong(m));
			return true;
		} else if (m.getSpec().equals(DMT.pong)) {
			if (m.isSet(DMT.SEND_TIME)) {
				PacketThrottle.getThrottle(m.getSource()).setRoundTripTime(
						System.currentTimeMillis() - m.getLong(DMT.SEND_TIME));
			}
			if (_rt.getPeerInfo(m.getSource()) == null) {
				_usm.send(m.getSource(), DMT.createWhoAreYou());
			}
			return true;
		} else if (m.getSpec().equals(DMT.introduce)) {
			AbstractPeer externalAddress = (AbstractPeer) m.getObject(DMT.EXTERNAL_ADDRESS);
			int build = m.getInt(DMT.BUILD);
			int firstGoodBuild = m.getInt(DMT.FIRST_GOOD_BUILD);
			_rt.setPeer(externalAddress);
			_rt.setPeerInfo(m.getSource(), new AbstractRoutingTable.PeerInfo(build, firstGoodBuild, externalAddress));
			return true;
		} else if (m.getSpec().equals(DMT.whoAreYou)) {
			_usm.send(m.getSource(), DMT.createIntroduce(m.getSource()));
			return true;
		} else if (m.getSpec().equals(DMT.joinRequest)) {
			final Integer uid = new Integer(m.getInt(DMT.UID));
			if (!_seenUids.contains(uid)) {
				_seenUids.add(uid);
				new AbstractThread() {

					protected boolean loop() throws InterruptedException {
						handleJoinRequest(m);
						_seenUids.remove(uid);
						return false;
					}

					protected void cleanUp() {
					}
				}.startThread();
			} else {
				_usm.send(m.getSource(), DMT.createRejectDueToLoop(m.getInt(DMT.UID)));
			}
			return true;
		} else if (m.getSpec().equals(DMT.requestData)) {
			final Integer uid = new Integer(m.getInt(DMT.UID));
			if (!_seenUids.contains(uid)) {
				_seenUids.add(uid);
				new AbstractThread() {

					protected boolean loop() throws InterruptedException {
						try {
							_usm.send(m.getSource(), DMT.createAcknowledgeRequest(m.getInt(DMT.UID)));
							PartiallyReceivedBlock prb = new PartiallyReceivedBlock(Dijjer.PACKETS_IN_BLOCK,
									Dijjer.PACKET_SIZE);
							retrieveData(new BlockInfo(m), m.getInt(DMT.TTL), m.getSource(), m.getInt(DMT.UID),
									(LinkedList) m.getObject(DMT.FORWARDERS), prb, null);
							_seenUids.remove(uid);
						} catch (Exception e) {
							Logger.warning("Error while retrieving data for another peer", e);
						}
						return false;
					}

					protected void cleanUp() {
					}
				}.startThread();
			} else {
				_usm.send(m.getSource(), DMT.createRejectDueToLoop(m.getInt(DMT.UID)));
			}
			return true;
		} else if (m.getSpec().equals(DMT.requestHash)) {
			final Integer uid = new Integer(m.getInt(DMT.UID));
			if (!_seenUids.contains(uid)) {
				_seenUids.add(uid);
				new AbstractThread() {

					public boolean loop() {
						try {
							_usm.send(m.getSource(), DMT.createRequestHashAck(m.getInt(DMT.UID)));
							VeryLongInteger hash = retrieveHash(new BlockInfo(m), m.getInt(DMT.TTL), m.getInt(DMT.UID),
									true);
							_usm.send(m.getSource(), DMT.createReplyHash(m.getInt(DMT.UID), hash));
							_seenUids.remove(uid);
						} catch (Exception e) {
							Logger.error(e);
							throw new RuntimeException(e);
						}
						return false;
					}
				}.startThread();
			} else {
				_usm.send(m.getSource(), DMT.createRejectDueToLoop(m.getInt(DMT.UID)));
			}
			return true;
		} else if (m.getSpec().equals(DMT.corruptionNotification)) {
			final Integer uid = new Integer(m.getInt(DMT.UID));
			if (!_seenUids.contains(uid)) { // We don't bother with the rejectDueToLoop
				_seenUids.add(uid);
				new AbstractThread() {

					public boolean loop() {
						try {
							handleCorruptionNotification(new BlockInfo(m), m.getInt(DMT.UID), m.getSource(), m
									.getBoolean(DMT.IS_HASH));
							_seenUids.remove(uid);
						} catch (Exception e) {
							Logger.error(e);
							throw new RuntimeException(e);
						}
						return false;
					}
				}.startThread();
			}
			return true;
		} else {
			return false;
		}
	}

	public void registerUid(int uid) {
		_seenUids.add(new Integer(uid));
	}

	public void unregisterUid(int uid) {
		_seenUids.remove(new Integer(uid));
	}

	public boolean retrieveData(BlockInfo bi, int ttl, PartiallyReceivedBlock prb, Download dl) throws IOException,
			RetrievalException {
		int uid = Misc.nextInt();
		registerUid(uid);
		boolean ret = retrieveData(bi, ttl, null, uid, new LinkedList(), prb, dl);
		unregisterUid(uid);
		return ret;
	}

	public boolean retrieveData(final BlockInfo bi, int ttl, final AbstractPeer requestor, final int uid,
			final LinkedList forwarders, final PartiallyReceivedBlock prb, Download download) throws IOException,
			RetrievalException {
		boolean cached = false;
		Logger.info("Dispatcher: Retrieving " + bi + " at ttl " + ttl);
		// See if we have it in our local store
		byte[] data = _ds.getDataForBlock(bi.getHashKey());
		if (data != null) {
			(new InputStreamBlockReceiver(prb, new ByteArrayInputStream(data))).start();
			if (requestor != null) {
				forwardData(requestor, prb, uid, forwarders, true);
			}
			return true;
		}
		AbstractPeer best = null;
		AbstractPeer last = null;
		HashSet exclude = new HashSet(); // We don't want to bother the seed with
		// requests for data
		exclude.addAll(_seedNodes);
		exclude.addAll(forwarders);
		while (true) { // We loop until we get the data or give up
			best = _rt.findClosest(bi.getHashKey(), exclude);
			// If the ttl has run out, there is nowhere to route, or if the next hop is further away in hash-space from
			// the data, we download it ourselves.
			// TODO: Make this robust against download failure
			if ((ttl < 1) || (best == null) || bi.getHashKey().closerTo(_rt.getPeer().getHash(), best.getHash())) {
				HttpBlockReceiver hbr = new HttpBlockReceiver(bi, prb);
				if (requestor != null) {
					new Thread() {

						public void run() {
							forwardData(requestor, prb, uid, forwarders, false);
							/*
							 * Only add the block to the DS if we didn't request it, this helps to prevent the datastore
							 * getting "flushed" every time we do a request
							 */
							try {
								_ds.addDataAsBlock(bi.getHashKey(), prb.getBlock());
							} catch (IOException e) {
								throw new RuntimeException(e);
							}
						}
					}.start();
				}
				hbr.start();
				return false;
			}
			// Ok, we are going to ask another peer for the data, first construct our request
			// And send it to our best guess
			if (!forwarders.contains(_rt.getPeer())) {
				forwarders.add(_rt.getPeer());
			}
			_usm.send(best, DMT.createRequestData(uid, bi, forwarders, ttl - 1));
			// Now we expect an Ack, give it 4 seconds
			Message ar = _usm.waitFor(MessageFilter.create(10000, DMT.acknowledgeRequest).addType(DMT.rejectDueToLoop)
					.setField(DMT.UID, uid));
			if (ar == null) {
				// No response in 10 seconds, flag this guy to be recontacted and continue the while loop to find another
				// or possibly just download it ourselves
				if (best == last) exclude.add(best);
				_rt.recontactPeer(best);
				exclude.add(best);
				continue;
			}
			if (ar.getSpec().equals(DMT.rejectDueToLoop)) {
				Logger.warning("requestData (uid: " + uid + ") rejected by " + best + " due to loop");
				exclude.add(best);
				continue;
			}
			// Got the ack, we give it 5 minutes to account for the time it might take a peer to download it from
			// a slow server
			MessageFilter succFilt = MessageFilter.create(30000, DMT.requestSuccessful).addType(DMT.requestFailed)
					.setSource(best).setField(DMT.UID, uid);
			final Message resp = _usm.waitFor(succFilt);
			if ((resp == null) || (resp.getSpec().equals(DMT.requestFailed))) {
				if (resp == null) {
					if (requestor != null) {
						_usm.send(requestor, DMT.createRequestFailed(uid, RetrievalException.TIMED_OUT,
								"Timed out waiting for requestSuccessful"));
					}
					throw new RetrievalException(RetrievalException.TIMED_OUT,
							"Timed out waiting on requestSuccessful for block " + bi.getBlockNo());
				} else {
					if (requestor != null) {
						_usm.send(requestor, DMT.createRequestFailed(uid, (resp.isSet(DMT.REASON) ? resp
								.getInt(DMT.REASON) : RetrievalException.UNKNOWN), (resp.isSet(DMT.DESCRIPTION) ? resp
								.getString(DMT.DESCRIPTION) : "Unknown")));
					}
					throw new RetrievalException((resp.isSet(DMT.REASON)
							? resp.getInt(DMT.REASON)
							: RetrievalException.UNKNOWN), (resp.isSet(DMT.DESCRIPTION) ? resp
							.getString(DMT.DESCRIPTION) : "Unknown"));
				}
			}
			// Ok, the data has been retrieved and should be on its way
			if (requestor != null) {
				(new Thread() {

					public void run() {
						forwardData(requestor, prb, uid, forwarders, resp.getBoolean(DMT.CACHED));
						if (prb.allReceived()) {
							try {
								_ds.addDataAsBlock(bi.getHashKey(), prb.getBlock());
							} catch (IOException e) {
								Logger.error("Error writing data to datastore", e);
							}
						}
					}
				}).start();
				// We add the new peer to our RT *after* the transfer as this is when it will start trying to contact us
				_rt.addPeer((AbstractPeer) resp.getObject(DMT.DATA_SOURCE));
				// Note that we only add the data to our DS if someone else requested it from us
			}
			BlockReceiver br = new BlockReceiver(_usm, best, uid, prb);
			br.receive();
			return resp.getBoolean(DMT.CACHED);
		}
	}

	protected void forwardData(AbstractPeer dest, PartiallyReceivedBlock prb, int uid, LinkedList forwarders, boolean wasCached) {
		_usm.send(dest, DMT.createRequestSuccessful(uid, _rt.getPeer(), wasCached));
		BlockTransmitter bt = new BlockTransmitter(_usm, dest, uid, prb);
		bt.send();
		synchronized (forwarders) {
			for (Iterator i = forwarders.iterator(); i.hasNext();) {
				AbstractPeer p = (AbstractPeer) i.next();
				_rt.addPeer(p);
			}
		}
	}

	protected void handleJoinRequest(Message jr) {
		int ttl = jr.getInt(DMT.TTL);
		int uid = jr.getInt(DMT.UID);
		AbstractPeer joiner = (AbstractPeer) jr.getObject(DMT.JOINER);
		if (joiner.isNull()) {
			joiner = jr.getSource();
		}
		if (joiner.equals(_rt.getPeer())) {
			Logger.warning("Received join request from self!  Ignoring");
			return;
		}
		_usm.send(jr.getSource(), DMT.createJoinRequestAck(uid));
		HashSet dontConsider = new HashSet();
		// Don't route to the joiner, or the guy who sent it to us
		dontConsider.add(joiner);
		dontConsider.add(jr.getSource());
		LinkedList exclude;
		if (jr.isSet(DMT.EXCLUDE)) {
			exclude = (LinkedList) jr.getObject(DMT.EXCLUDE);
			dontConsider.add(exclude);
		} else {
			exclude = new LinkedList();
		}
		while (true) {
			Logger.info("LOOP: Forward joinRequest");
			AbstractPeer best = _rt.findClosest(joiner.getHash(), dontConsider);
			/*
			 * Terminate request and respond if: 1) TTL has expired 2) We couldn't find anywhere to route to
			 */
			if ((ttl == 0) || (best == null) || ttl < Dijjer.JOINTTL) {
				LinkedList peers = new LinkedList();
				peers.add(_rt.getPeer());
				_rt.addPeer(joiner);
				_usm.send(jr.getSource(), DMT.createJoinResponse(uid, peers));
				break;
			}
			exclude.add(new Integer(AbstractFactory.getFactory().getRoutingTable().getPeer().hashCode()));
			_usm.send(best, DMT.createJoinRequest(uid, joiner, jr.getInt(DMT.TTL) - 1, exclude));
			Message jra = _usm.waitFor(MessageFilter.create(6000, DMT.joinRequestAck).addType(DMT.rejectDueToLoop)
					.setField(DMT.UID, uid));
			if (jra == null) {
				_rt.recontactPeer(best);
				continue;
			}
			if (jra.getSpec().equals(DMT.rejectDueToLoop)) {
				dontConsider.add(best);
				Logger.warning("joinRequest (uid: " + uid + ") rejected by " + jra.getSource() + " due to loop");
				continue;
			}
			Message jrs = _usm.waitFor(MessageFilter.create(1000 * (ttl + 3), DMT.joinResponse).setSource(best)
					.setField(DMT.UID, uid));
			if (jrs == null) {
				LinkedList peers = new LinkedList();
				peers.add(_rt.getPeer());
				_rt.addPeer(joiner);
				_usm.send(jr.getSource(), DMT.createJoinResponse(uid, peers));
				break;
			} else {
				LinkedList peers = (LinkedList) jrs.getObject(DMT.PEERS);
				if (_rt.getPeers().size() < _rt.getMaxSize()) {
					peers.add(_rt.getPeer());
					_rt.addPeer(joiner);
				}
				// If any are null assume they are the sender of the joinResponse
				for (ListIterator i = peers.listIterator(); i.hasNext();) {
					AbstractPeer p = (AbstractPeer) i.next();
					if (p.isNull()) {
						i.remove();
					}
				}
				_usm.send(jr.getSource(), DMT.createJoinResponse(uid, peers));
				break;
			}
		}
	}

	public VeryLongInteger retrieveHash(final BlockInfo bi, int ttl, int uid, boolean cacheResult) throws IOException,
			RetrievalException {
		VeryLongInteger lo = _hs.getHash(bi.getHashHashKey());
		if (lo != null) {
			return lo;
		}
		HashSet exclude = new HashSet(); // We don't want to bother the seeds with
		// requests for hashes
		exclude.addAll(_seedNodes);
		while (true) {
			AbstractPeer best = _rt.findClosest(bi.getHashHashKey(), exclude);
			if ((ttl == 0) || (best == null) || (bi.getHashHashKey().closerTo(_rt.getPeer().getHash(), best.getHash()))) {
				// We are going to download and retrieve it ourselves
				VeryLongInteger dataHash = retrieveHashFromServer(bi);
				_hs.put(bi.getHashHashKey(), dataHash);
				return dataHash;
			}
			// Ok, we are going to try forwarding it to the next closest peer
			_usm.send(best, DMT.createRequestHash(uid, bi, ttl - 1));
			Message requestHashAck = _usm.waitFor(MessageFilter.create(10000, DMT.requestHashAck).addType(
					DMT.rejectDueToLoop).setField(DMT.UID, uid));
			if (requestHashAck == null) {
				exclude.add(best);
				_rt.recontactPeer(best);
				continue;
			}
			if (requestHashAck.getSpec().equals(DMT.rejectDueToLoop)) {
				Logger.warning("requestHash (uid: " + uid + ") rejected by " + requestHashAck.getSource()
						+ " due to loop");
				exclude.add(best);
				continue;
			}
			Message replyHash = _usm.waitFor(MessageFilter.create(30000, DMT.replyHash).setField(DMT.UID, uid));
			if (replyHash == null) {
				Logger.info("Timed out waiting for replyHash from " + best);
				ttl = 0;
				continue;
			}
			VeryLongInteger dataHash = (VeryLongInteger) replyHash.getObject(DMT.HASH);
			if (cacheResult) {
				_hs.put(bi.getHashHashKey(), dataHash);
			}
			return dataHash;
		}
	}

	private void handleCorruptionNotification(BlockInfo bi, int uid, AbstractPeer source, boolean forHash) {
		try {
			// Ok, first lets see if its valid
			if (forHash) {
				VeryLongInteger myHash = _hs.getHash(bi.getHashHashKey());
				if (myHash == null) {
					Logger.info("Ignoring corruptionNotification because we don't have the hash in our HashStore");
					return;
				}
				VeryLongInteger actualHash = retrieveHashFromServer(bi);
				if (actualHash.equals(myHash)) {
					Logger.warning("Received bogus corruptionNotification from " + source + ", dropping from RT");
					LinkedList r = new LinkedList();
					r.add(source);
					_rt.removePeers(r, "You sent a bogus corruptionNotification");
					return;
				}
				_hs.delete(bi.getHashHashKey());
			} else {
				byte[] localData = _ds.getDataForBlock(bi.getHashKey());
				if (localData == null) {
					Logger.info("Ignoring corruptionNotification because we don't have the data in our DataStore");
					return;
				}
				VeryLongInteger myHash = new VeryLongInteger(localData);
				PartiallyReceivedBlock prb = new PartiallyReceivedBlock(Dijjer.PACKETS_IN_BLOCK, Dijjer.PACKET_SIZE);
				(new HttpBlockReceiver(bi, prb)).start();
				VeryLongInteger actualHash = new VeryLongInteger(prb.getBlock());
				if (actualHash.equals(myHash)) {
					Logger.warning("Received bogus corruptionNotification from " + source + ", dropping from RT");
					LinkedList r = new LinkedList();
					r.add(source);
					_rt.removePeers(r, "You sent a bogus corruptionNotification");
					return;
				}
				_ds.delete(bi.getHashKey());
			}
			// It is valid, broadcast it to other peers
			for (Iterator i = ((ArrayList) _rt.getPeers().clone()).iterator(); i.hasNext();) {
				AbstractPeer p = (AbstractPeer) i.next();
				if (!p.equals(source)) {
					_usm.send(p, DMT.createCorruptionNotification(uid, bi, forHash));
				}
			}
		} catch (Exception e) {
			Logger.warning("Exception while processing corruptionNotification", e);
		}
	}

	protected VeryLongInteger retrieveHashFromServer(BlockInfo bi) throws RetrievalException {
		PartiallyReceivedBlock prb = new PartiallyReceivedBlock(Dijjer.PACKETS_IN_BLOCK, Dijjer.PACKET_SIZE);
		HttpBlockReceiver hbr = new HttpBlockReceiver(bi, prb);
		try {
			hbr.start();
		} catch (IOException e) {
			throw new RetrievalException(RetrievalException.IO_ERROR, e.getMessage());
		}
		return new VeryLongInteger(prb.getBlock());
	}

	public AbstractRoutingTable getRoutingTable() {
		return _rt;
	}

	public DataStore getDataStore() {
		return _ds;
	}

	public HashStore getHashStore() {
		return _hs;
	}

	public static void init() {
		_disp = new Dispatcher();
	}

	public static Dispatcher getDispatcher() {
		return _disp;
	}
}